package com.ruoyi.ghxx.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;

/**
 */
@Controller
public class MyMqttClient {
    @Value("${ruoyi.HOST}")
    private String HOST;
    @Value("${ruoyi.clientID}")
    private String clientID;
    @Value("${ruoyi.userName}")
    private String userName;
    @Value("${ruoyi.passWord}")
    private String passWord;
    private MqttClient client;
    private static volatile MyMqttClient mqttClient = null;
    public static MyMqttClient getInstance() {
        if (mqttClient == null) {
            synchronized (MyMqttClient.class) {
                if (mqttClient == null) {
                    mqttClient = new MyMqttClient();
                }
            }
        }
        return mqttClient;
    }

    /**
     * 创建连接,@PostConstruct注入到容器中
     */
    @PostConstruct
    public void connect() {
        mqttClient = this;
        try {
            System.out.println("---------------MQTT服务开始启动，服务器为："+HOST+"---------------");
            //初始化连接设置对象
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            //true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
            mqttConnectOptions.setCleanSession(true);
            mqttConnectOptions.setConnectionTimeout(1000);//设置连接超时
            mqttConnectOptions.setUserName(userName); // 设置连接的用户名
            mqttConnectOptions.setPassword(passWord.toCharArray());// 设置连接的密码
            mqttConnectOptions.setKeepAliveInterval(120);
            client = new MqttClient(HOST,clientID+"&"+new Date().getTime(), new MemoryPersistence());
            client.setCallback(new MqttRecieveCallback());//执行回调
            client.connect(mqttConnectOptions);//创建连接
            System.out.println("---------------MQTT服务启动成功，服务器为："+HOST+"---------------");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 订阅主题
     *
     * @param topic qos默认为1
     */
    public void subTopic(String topic) {
        try {
            client.subscribe(topic, 1);
        } catch (MqttException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic
     * @param gos
     */
    public void subTopic(String topic, int gos) {
        try {
            client.subscribe(topic, gos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 清空主题
     * @param topic
     */
    public void cleanTopic(String topic) {
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发送主题消息
     * @param topic
     */
    public void sendMessage(String topic, byte[] payload, int qos, boolean retained) {
        try {
            client.publish(topic,payload,qos,retained);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
